AddIns and Scripting
Roslyn .NET Scripting is an easy way to write custom functionality with C# scripts that are compiled during startup.
You can either write inline scripts in Foopipes.yml or load .csx scripts from file or an url.
Addins
Addins are scripts that are loaded, compiled and run at startup. Typically an addin register new service types and tasks which then are available for the pipelines.
addins:
- url: "https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx"
- script: |
PipelineTask("nospaces").Json((context, json, ct) =>
{
json.Data["name"] = json.Data["name"].Replace(" ", "");
return json;
});
services:
mailgun:
type: mailgun
apiBaseUrl: https://api.mailgun.net/v3/sandbox5ded26xxxxxxxxxxxxb8.mailgun.org
apiKey: key-3a56bxxxxxxxxxxxxxxx5c
defaultFrom: me@mydomain.com
pipelines:
-
when:
- queue: started
from:
- http: "https://jsonplaceholder.typicode.com/posts"
do:
- nospaces
to:
- log
error:
- { mailgun.send, to: me@mydomain.com, subject: Error, text: An error occured }
Community Addins
Addins created by the community is available as a public repository on Github: https://github.com/AreteraAB/Foopipes.Addins.
When loading a community Addin, use this url format: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/master/Mailgun/mailgun.csx.
Consider using a tag or commit hash instead of the latest version: https://raw.githubusercontent.com/AreteraAB/Foopipes.Addins/47e1e6d74f2546673b74f929f9ebf74ca56afae5/Tail/Tail.csx.
Pull requests are welcome!
Registering tasks
Register custom tasks by passing a callback to the method PipelineTask(string name)
.
Pipeline tasks can be json and/or binary or dynamic, depending what kind of data they're able to handle.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
// Do something here with json data
return json;
});
PipelineTask("mytask").Binary(async (context, data, cancellationToken) =>
{
// Do something here with binary data
return data;
});
Registering services
Custom services is the best way to keep state. You can either register a service type which later can be referenced in the configuration file, or a service instance which will be a named singleton.
using Foopipes.Abstractions.Services;
class MyService : ServiceBase
{
public string MyConfigValue => Config["myConfigValue"];
private int _counter = 0;
public int IncrementCounter()
{
return Interlocked.Increment(ref _counter);
}
}
Service.Register("myserviceType", typeof(MyService));
Create an instance and configure your service in the configuration file:
services:
myserviceInstanceName:
type: myserviceType
myConfigValue: hello
In your tasks, you can get hold of a service instance like this:
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
var service = await context.GetService<MyService>(defaultName: "myserviceInstanceName");
json.Data["counter"] = service.IncrementCounter();
return json;
});
Observer/observable pattern
If your service implements IObservableService
you can emit events that triggers pipelines. Very powerful combined with IRunnableService
and System.Reactive.
#r "System.Reactive"
using System.Reactive.Subjects;
using Foopipes.Abstractions.Services;
class MyObservableService : ServiceBase, IObservableService, IRunnableService
{
private Subject<ServiceEvent> _subject = new Subject<ServiceEvent>();
// IObservableService
public IObservable<ServiceEvent> Observable => _subject;
// IRunnableService
public async Task Run(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
var metadata = JObject.FromObject(new
{
currentTime = DateTime.Now,
});
var serviceEvent = new ServiceEvent(this, metadata, new[] { new BinaryData(new byte[]{ 0x42} }));
_subject.OnNext(serviceEvent);
}
}
}
Service.Register("myObservableService", typeof(MyObservableService));
services:
myObservableService:
type: myObservableService
pipelines:
-
when:
- myObservableService
to:
- log
Invoking tasks from a task
You can write tasks that invoke other tasks.
PipelineTask("sendGreeting").Json(async (context, json, cancellationToken) =>
{
var data = JObject.FromObject(new
{
greeting = "hello " + await context.GetExpandedConfigValue("name")
});
var config = new Dictionary<string, string>
{
{"url", "https://www.myservice.com/api" },
{"method", "post"},
{"body", "formUrlEncoded"}
};
var r = await context.RunTask("http").WithData(data).WithArguments(config).Invoke(cancellationToken);
return json;
});
Invoke with:
do:
- { sendGreeting, name: "Foo #{lastname}" }
Returning results
Task callbacks can return json and/or binary data.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new JsonData( JObject.FromObject(new { hello="world"}) );
});
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new BinaryData(new byte[]{0x42});
});
It's also possible to return multiple results.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
return new ProcessJsonResult( new[]{
JObject.FromObject(new { hello="world1" }),
JObject.FromObject(new { hello="world2" }),
});
});
Referencing other assemblies
You can reference assemblies using the #r syntax.
#r "System.Security.Cryptography.Csp"
using System.Security.Cryptography;
var _aes = Aes.Create();
PipelineTask("decryptstring").Binary(async (context, binary, cancellationToken) =>
{
using (var decryptor = _aes.CreateDecryptor(key, iv))
{
// etc etc
}
return JObject.FromObject(new { value=decryptedData });
});
Currently it is not possible to reference Nuget assembles.
Data binding
Use context.BindValue(string bindingExpression)
to obtain a value using the data binding functionality.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
var myvalue = await context.BindValue("#{elasticsearch:myvalue}");
json.Data["boundValue"] = myvalue;
return json;
});
Similary, use context.SetValue(string key, string value)
to set a value.
PipelineTask("mytask").Json(async (context, json, cancellationToken) =>
{
await context.SetValue("elasticsearch:myvalue", "hello world");
return json;
});
Class Reference
Addin host globals:
{
ITaskBuilder PipelineTask(string name);
IServiceBuilder Service { get; }
}
interface ITaskBuilder
{
/************ Async Json ************/
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<IProcessResult>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JsonData>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<JObject[]>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<BinaryData>> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, Task<byte[]>> func);
/************ Non async Json ************/
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, IProcessResult> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JsonData> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, JObject[]> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, byte[]> func);
public ITaskBuilder Json(Func<IScriptTaskContext, JsonData, CancellationToken, BinaryData> func);
/************ async Binary************/
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<IProcessResult>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JsonData>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<JObject[]>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<BinaryData>> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, Task<byte[]>> func);
/************ Non async Binary************/
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, IProcessResult> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JsonData> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, JObject[]> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, byte[]> func);
public ITaskBuilder Binary(Func<IScriptTaskContext, BinaryData, CancellationToken, BinaryData> func);
/************ Non async Dynamic ************/
public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, dynamic> func);
/************ async Dynamic ************/
public ITaskBuilder Dynamic(Func<IScriptTaskContext, dynamic, CancellationToken, Task<dynamic>> func);
public ITaskBuilder WithDefaultConfigKey(string defaultConfigKey);
}
interface IServiceBuilder
{
public IServiceBuilder Instance(string name, IService instance);
public IServiceBuilder Register(string typeName, Type serviceType);
}
interface IScriptTaskContext
{
IDictionary<string, string> Config { get; }
ILogger Logger { get; }
ILoggerFactory LoggerFactory { get; }
IPipelineContext PipelineContext { get; }
IServiceProvider ServiceProvider { get; }
Task<string> BindValue(string bindingExpression);
Task<string> GetExpandedConfigValue(string key, bool throwIfNotSet = true);
IService GetService(string name);
IRunTaskBuilder RunTask(string name);
Task SetValue(string key, string val);
}
public static class ScriptTaskContextExtensions
{
public static async Task<TService> GetService<TService>(this IScriptTaskContext context,
string defaultName,
string configKeyName = "service",
bool throwIfNotFound = true) where TService : class;
}
public class JsonData : IProcessResultData
{
public JsonData(JObject jsonData, JObject metadata=null);
public JObject Metadata { get; }
public JObject Data { get; }
public static JsonData Empty { get; }
}
public class BinaryData : IProcessResultData
{
public BinaryData(byte[] binaryData, JObject metadata = null);
public JObject Metadata { get; }
public byte[] Data { get; }
public static BinaryData Empty { get; }
}